-
Notifications
You must be signed in to change notification settings - Fork 525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Perf: load data systems on rank 0 #4478
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request modifies the Changes
Possibly related PRs
Suggested reviewers
Tip CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
deepmd/pt/utils/dataloader.py (3)
96-97
: Consider documenting theself.systems
initialization more explicitly.Here, you add a new typed attribute, but it would be helpful to have a docstring or an inline comment indicating that this list will either be populated with real datasets on rank 0 or with dummy placeholders on other ranks. This clarifies the rank-dependent data flow for future maintainers.
103-104
: Explore building partial placeholders instead of a full list of None.Currently, you allocate a “None” list for all systems on non-zero ranks. This is fine, but consider if there's an even lighter or more descriptive placeholder structure (e.g., a small object with essential metadata) to prevent confusion about what these positions represent during debugging. This can help future readers who might assume that "None" indicates an error rather than a deferred load.
105-107
: Strengthen your broadcast verification.The assertion only checks if the last dataset is non-None. Consider verifying that none of the items in
self.systems
remain None to ensure a complete broadcast. For example:
assert all(s is not None for s in self.systems)
This reduces the risk of partial broadcast issues going unnoticed.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
deepmd/pt/utils/dataloader.py
(1 hunks)
🔇 Additional comments (1)
deepmd/pt/utils/dataloader.py (1)
98-102
: Evaluate large-scale broadcasting overhead.
When rank 0 constructs numerous (or very large) datasets and then broadcasts them, it might be memory-intensive or time-consuming in large-scale distributed settings. Consider verifying that each dataset remains lightweight enough for object broadcasting. If necessary, explore an alternative approach (e.g., distributing only minimal metadata) to avoid potential OOM or performance bottlenecks.
Why do you expect each rank loads the same data? |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## devel #4478 +/- ##
==========================================
+ Coverage 84.41% 84.58% +0.17%
==========================================
Files 670 675 +5
Lines 62147 63580 +1433
Branches 3487 3488 +1
==========================================
+ Hits 52464 53782 +1318
- Misses 8556 8673 +117
+ Partials 1127 1125 -2 ☔ View full report in Codecov by Sentry. |
Good question. Currently every rank reads the config from the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
deepmd/pt/utils/dataloader.py (1)
96-106
: Consider memory management and error handling improvementsWhile centralizing data loading to rank 0 reduces file system stress, there are several areas for improvement:
- Memory Management: Loading all systems on rank 0 before broadcasting could cause memory pressure on that node.
- Error Handling: Add explicit error handling for broadcast failures.
- Verification: The assertion only checks the last system, consider verifying all systems.
Consider these improvements:
self.systems: list[DeepmdDataSetForLoader] = [] global_rank = dist.get_rank() if dist.is_initialized() else 0 if global_rank == 0: log.info(f"Constructing DataLoaders from {len(systems)} systems") - with Pool(os.cpu_count()) as pool: + # Use a configurable number of workers to avoid overwhelming the system + n_workers = min(os.cpu_count(), 8) # or make this configurable + with Pool(n_workers) as pool: self.systems = pool.map(construct_dataset, systems) else: self.systems = [None] * len(systems) # type: ignore if dist.is_initialized(): - dist.broadcast_object_list(self.systems) - assert self.systems[-1] is not None + try: + dist.broadcast_object_list(self.systems) + # Verify all systems were received correctly + assert all(system is not None for system in self.systems), "Some systems failed to broadcast" + except Exception as e: + log.error(f"Failed to broadcast systems: {str(e)}") + raise
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
deepmd/pt/utils/dataloader.py
(1 hunks)
🔇 Additional comments (1)
deepmd/pt/utils/dataloader.py (1)
96-106
: Distributed design explanation: Same data, different sampling
The implementation correctly handles distributed training by:
- Loading data once on rank 0 and broadcasting to all ranks (reducing I/O)
- Using separate DistributedSampler instances per rank (ensuring different samples per rank)
This is a standard pattern in distributed deep learning where the dataset is shared but each rank samples differently.
@njzjz: Regarding your question about why each rank loads the same data - while ranks receive identical data, they use different samplers to access different portions during training. This approach reduces file system load while maintaining training correctness.
Let's verify the distributed sampling behavior:
✅ Verification successful
Distributed implementation is correct and follows best practices
The code correctly implements distributed data loading and sampling:
- Data is efficiently loaded once on rank 0 and broadcast to other ranks
- Each rank gets its own DistributedSampler instance when distributed mode is active (
dist.is_available() and dist.is_initialized()
) - Samplers are properly initialized with the dataset, ensuring each rank gets different portions of data during training
- The implementation follows PyTorch's recommended pattern for distributed training
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that the code maintains distributed sampling after loading data
# Check for DistributedSampler usage
rg "DistributedSampler" --type py
# Verify sampler initialization is rank-aware
ast-grep --pattern 'DistributedSampler($system)'
Length of output: 315
Script:
#!/bin/bash
# Let's verify the complete context around DistributedSampler usage
# Check the full context of sampler initialization
rg "DistributedSampler" -B 5 -A 5 deepmd/pt/utils/dataloader.py
# Check if there's any rank-specific logic in sampler setup
ast-grep --pattern 'def _setup_samplers(self) {
$$$
}'
Length of output: 1241
I don't get your point. I believe the data is loaded asynchronously. |
I see. What you mean is not all ranks are accessing the same data files, and this is correct. The case is when the data is stored in a shared file system with a bandwidth limit. |
So it just affects the initialization of the data class? Well, I am not sure how much data is loaded in the initialization step. |
Signed-off-by: Chun Cai <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
deepmd/pt/utils/dataloader.py (1)
96-106
: Implementation looks good with room for robustness improvements.The changes effectively implement the PR objective of loading data only on rank 0 and broadcasting to other ranks. The implementation is clean and handles both distributed and non-distributed scenarios correctly.
Consider these improvements for handling large datasets:
- Memory efficiency: Consider implementing chunked broadcasts for large datasets to manage memory usage better.
- Error handling: Add timeout and error handling for broadcast operations:
try: if dist.is_initialized(): # Consider chunking for large datasets chunk_size = 10 # or calculate based on system size for i in range(0, len(systems), chunk_size): chunk = self.systems[i:i + chunk_size] with torch.cuda.device('cpu'): dist.broadcast_object_list(chunk, timeout=timedelta(minutes=5)) self.systems[i:i + chunk_size] = chunk except Exception as e: log.error(f"Failed to broadcast systems: {str(e)}") raise
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
deepmd/pt/utils/dataloader.py
(1 hunks)
🔇 Additional comments (1)
deepmd/pt/utils/dataloader.py (1)
96-106
: Verify sampler independence across ranks.
The implementation preserves sampler independence across ranks as required. Let's verify this behavior:
✅ Verification successful
Sampler independence across ranks is properly implemented
The verification confirms that sampler independence is correctly maintained:
- Each rank gets its own
DistributedSampler
instance for each system - The seed initialization is handled properly through
setup_seed()
which sets seeds for:- PyTorch's main RNG (
torch.manual_seed
) - CUDA RNG (
torch.cuda.manual_seed_all
) - DeepMD's custom RNG (
dp_random.seed
)
- PyTorch's main RNG (
- The sampler initialization is independent of the data loading changes, as it occurs after the data broadcast
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that samplers maintain independence across ranks
# Look for sampler seed initialization or rank-specific sampling logic
# Check for rank-specific sampling logic
rg -A 5 "DistributedSampler|WeightedRandomSampler" deepmd/pt/utils/dataloader.py
# Check for seed-related configurations
rg "seed|random" deepmd/pt/utils/dataloader.py
Length of output: 1197
Co-authored-by: Jinzhe Zeng <[email protected]> Signed-off-by: Chun Cai <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
deepmd/pt/utils/dataloader.py (2)
98-103
: Add error handling for parallel data loadingWhile the rank-based loading implementation is efficient, consider adding error handling for the pool.map operation to gracefully handle potential failures during data loading.
if global_rank == 0: log.info(f"Constructing DataLoaders from {len(systems)} systems") with Pool(max(1, env.NUM_WORKERS)) as pool: - self.systems = pool.map(construct_dataset, systems) + try: + self.systems = pool.map(construct_dataset, systems) + except Exception as e: + log.error(f"Failed to load data systems: {str(e)}") + raise
96-106
: Consider chunked broadcasting for very large datasetsThe current implementation successfully achieves the PR's objective of reducing file system stress by loading data only on rank 0. However, for very large datasets, consider implementing a chunked broadcasting mechanism to:
- Reduce memory pressure during broadcast
- Allow for progress tracking
- Enable partial retry on failure
Example approach:
def broadcast_in_chunks(obj_list, chunk_size=1000): """Broadcast large lists in chunks to manage memory better""" for i in range(0, len(obj_list), chunk_size): chunk = obj_list[i:i + chunk_size] dist.broadcast_object_list(chunk) yield chunk
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
deepmd/pt/utils/dataloader.py
(1 hunks)
🔇 Additional comments (2)
deepmd/pt/utils/dataloader.py (2)
96-97
: LGTM: Proper initialization with distributed awareness
The initialization of the systems list and rank determination is well-implemented with appropriate fallback for non-distributed scenarios.
104-106
: Consider adding size verification and timeout mechanism
The broadcast implementation could benefit from additional safeguards:
- Verify the total size of systems before broadcast to prevent OOM
- Add a timeout mechanism for the broadcast operation
Let's verify the potential impact of large datasets:
The current implementation loads data on each rank. This will stress the file system.
In this PR, only rank 0 will load data systems, and it will be broadcasted to each rank.
The data sampler initialized later will still use the exclusive seed of each rank.
Summary by CodeRabbit
New Features
Bug Fixes